iT邦幫忙

2023 iThome 鐵人賽

DAY 15
0

摩爾定律 所賜,現在的計算資源越來越強,有越來越多的程式可以分散地、平行化 (parallelism) 的運行,平行運行下的程式勢必會遇到諸如 race condition、deadlock 或難以測試的問題,之後幾天我們會使用 pure function 來建立支援平行化和異步計算的 library,除此之外,我們還可以學習從 functional programming 的角度應對平行運行時的設計思維,

pure function 的好處之一就是易於組合和模組化,所以我們會維持一貫主題 分離關注點,直到真的 ''運行" 之前,把所有計算、轉換當成某種表達式來 "組合" 起來。

就讓我們從簡單的使用案例開始吧!

推導出 library 中需要哪些核心 API

假設我們有個 sum function 加總 Seq,

def sum(ints: IndexedSeq[Int]) =
  ints.foldLeft(0)(_ + _)

如果我們不想依序的加總,我們可以用 divide-and-conquer (分治) 算法來加速計算,

def sum(ints: IndexedSeq[Int]): Int =
  if ints.size <= 1 then
    ints.headOption.getOrElse(0)
  else
    val (l, r) = ints.splitAt(ints.size / 2)
    sum(l) + sum(r)

sum(l) 和 sum(r) 是我們可以進行平行運算的地方,所以我們可以定義 Par[A] 這個容器型態來表示它可能會在不同執行緒上運行,然後將欲回傳的屬性表示 A,以此處來說就是 Int,最後我們得取得運行結果,所以我們需要以下 2 個 function:

  • def unit[A](a: => A): Par[A] 接受還沒 evaluate (運行) 的 call-by-name 表達式參數,然後回傳 Par,表示它可能會在不同執行緒上運行。

  • def get[A](a: Par[A]): A 從平行化運行的 Par 中取得結果。

根據我們定義的 type 修改後的程式如下。

def sum(ints: IndexedSeq[Int]): Int =
  if ints.size <= 1 then
    ints.headOption.getOrElse(0)
  else
    val (l, r) = ints.splitAt(ints.size / 2)
    val sumL: Par[Int] = Par.unit(sum(l))
    val sumR: Par[Int] = Par.unit(sum(r))
    Par.get(sumL) + Par.get(sumR)

為什麼我們不使用 java.lang.Thraed 呢?最大的原因是 Thread 的 start 和 join 並沒有回傳有意義的值,所以若我們要從 Runnable 取得結果時,勢必會發生 side effect。

public interface Runnable {
 public abstract void run();
}

public class Thread implements Runnable {
	public synchronized void start()
	public final void join()
}

在這裡,如果我們使用 Substitution Model 把 sumL 和 sumR 替換掉的話,雖然結果還是正確,但它失去平行化功能了,

Par.get(Par.unit(sum(l))) + Par.get(Par.unit(sum(r)))

代表 unit 在給 get 當參數用時,有著 side effect,我們不能直接內嵌 unit 進去,因為 get 得等待 Par 運行完成然後取得結果,

所以看起來我們要避免調用 get,或者減少調用次數,在最終階段才調用,且我們也想要 Par 型態是有能力組合異步計算,而不用等待執行緒完成,

或許我們可定義一個新 function map2 來嘗試解決,

def sum(ints: IndexedSeq[Int]): Par[Int] =
  if ints.size <= 1 then
    Par.unit(ints.headOption.getOrElse(0))
  else
    val (l, r) = ints.splitAt(ints.size / 2)
    Par.map2(sum(l), sum(r))(_ + _)

但此時又有新的問題了,如果我們細部拆解調用順序,如下圖,

可以發現因為 scala function 預設是 strict 的關係,evaluate 參數的順序是由左到右,所以 sum(r) 不會馬上執行,而是要等到 sum(l) 做完才會輪到 sum(r),看起來我們得讓 map2 lazy,而且能立即把 2 個參數平行化運行。

Par.map2(Par.unit(sum(l)), Par.unit(sum(r)))(_ + _)

明確的 fork

但這樣真的好嗎?如果是以下程式平行化運行好像沒什麼太意義,我們真的需要分隔 logical thread (邏輯執行緒) 來運行嗎?

Par.map2(Par.unit(1), Par.unit(1))(_ + _)

這也點出了另一個問題,我們沒有選項,能明確讓程式知道我們真的要把平行化運行從主執行緒 fork 出來,因此我們可以在定義一個 fork function 表達 fork 的決心,

def fork[A](a: => Par[A]): Par[A]

有了 fork,我們可以讓 map2 保持 strict,最後我們的 sum 會長的下面這樣。

def sum(ints: IndexedSeq[Int]): Par[Int] =
  if ints.size <= 1 then
    Par.unit(ints.headOption.getOrElse(0))
  else
    val (l, r) = ints.splitAt(ints.size / 2)
    Par.map2(Par.fork(sum(l)), Par.fork(sum(r)))(_ + _)

現在來看一下之前的 unit function 要 strict 還是 lazy,因為有 fork function 的關係,我們可以讓 unit 變為 strict,然後使用 fork 和 unit 來實作 lazyUnit function,

def unit[A](a:  A): Par[A]
def lazyUnit[A](a: => A): Par[A] = fork(unit(a))

最後來看一下 fork 該如何實現,因為 Par 只是個容器型態,表示我們會用多執行緒運行程式,

如果 fork 的實作會立即建立多執行緒,我們會失去控制平行化策略的彈性,從 基於介面而非實現開發 這個觀點來看,我們需要一個 function 來負責啟動,倘若不立即建立多執行緒,那我們就更需要一個 function 來啟動,

所以我們可以把 get 改名為 run,來命令我們的 library 開始運行,並取回運行結果。

def run[A](a: Par[A]): A

我們的 library 需要哪些核心 API 已經推導的差不多了,明天繼續吧!


上一篇
純粹的 functional 狀態 (3)
下一篇
Purely Function 的平行化 (2)
系列文
用 Scala 3 寫的 Functional Programming 會長什麼樣子?30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言